package org.fjsei.yewu.resolver;

import com.alibaba.fastjson2.JSON;
import com.querydsl.core.BooleanBuilder;
import com.querydsl.core.types.dsl.ComparablePath;
import lombok.extern.slf4j.Slf4j;
import md.cm.unit.Unit;
import md.cm.unit.UnitPi;
import md.specialEqp.Eqp;
import md.specialEqp.inspect.*;
import md.specialEqp.type.PipingUnit;
import org.fjsei.yewu.entity.fjtj.UntMge;
import org.fjsei.yewu.entity.fjtj.UntMgeRepository;
import org.fjsei.yewu.entity.incp.JcPermtUnt;
import org.fjsei.yewu.entity.incp.JcPermtUntRepository;
import org.fjsei.yewu.entity.incp.JcUntMge;
import org.fjsei.yewu.entity.incp.JcUntMgeRepository;
import org.fjsei.yewu.filter.Node;
import org.fjsei.yewu.index.EqpEs;
import org.fjsei.yewu.index.PipingUnitEs;
import org.fjsei.yewu.jpa.*;
import org.fjsei.yewu.pojo.SliceSyncRes;
import org.fjsei.yewu.repository.maint.JsliceMang;
import org.jgroups.util.Triple;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.client.elc.NativeQuery;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
//import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.SearchHitSupport;
import org.springframework.data.elasticsearch.core.SearchHits;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
//import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
//import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.Query;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.data.repository.CrudRepository;
import org.springframework.data.repository.query.FluentQuery;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import jakarta.persistence.criteria.*;
import java.time.LocalDateTime;
import java.util.*;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

import static graphql.Assert.assertTrue;

/**
 * 维护用的 公用部分
 * 派生类必须支持@Autowired逻辑。
 * 不要在graphql模型文件 定义接口 checkAuth: User，报错;
*/
@Slf4j
public class MaintComn extends Comngrql {
    @Autowired  protected ElasticsearchOperations esTemplate;
    @Autowired  protected JcPermtUntRepository jcPermtUntRepository;
    //@Autowired  private Units units;
    @Autowired  protected UntMgeRepository untMgeRepository;
    @Autowired  protected JcUntMgeRepository jcUntMgeRepository;
    /**公众聚集场所 {id:'1',"公众聚集场所（学校）"},
     * 最后一个 {id:'11',text:'住宅'} 没啥意义的数据？干脆忽视掉，同步时也抛弃重置。*/
    private  String[]  plclsArr={"学校","幼儿园","医院","车站","客运码头","商场","体育场馆","展览馆","公园","其它公众聚集场所"};


    /**若 usejc=true 从监察单位永久表，否则从检验平台单位id来映射Unit
     * 监察ALT_UNT_ID来自JC_PMT_UNT的；
     * */
    public Unit getUnitfromOldUnitId(Boolean usejc, Long id) {
        if(null==id)    return null;
        return getUnitfromOldUnitId(usejc,id,true);
    }
    /**根据旧平台单位ID映射本平台的Unit;
     * usejc:是否监察的；ofPmt：是否监察单位永久表。
     * 有些单位注销：id变了，可是公司名字个人身份证还是能够定位的？
     * @param usejc:作废了！ 不用监察的
     * */
    public Unit getUnitfromOldUnitId(Boolean usejc, Long id, Boolean ofPmt) {
        if(null==id)    return null;
        Unit aunit=null;
        if(usejc) {  //监察平台
            if(ofPmt) {     //从PMT永久单位表找
                JcPermtUnt jcPermtUnt = jcPermtUntRepository.findById(id).orElse(null);
                if (null != jcPermtUnt && null != jcPermtUnt.getUunit()) {
                    aunit = units.findById(jcPermtUnt.getUunit()).orElse(null);
                    //if(null==aunit)   log.info("没单位 id {},jc", id);
                }
                else if (null == jcPermtUnt){
                    if(id!=205145L)
                        log.info("PMT找不到id {}", id);
                }
                //else log.info("单位id {},PMT没映射Unit", id); 主要是监察端用户随意输入的单位名称！并非预先准备好了去选择一个单位。
                //【大量出现】PMT找不到id 205145 ，WZ00000/0000000100；=假货！。205145来自jc_unt_mge
            }else{   //福建监察平台对应id：从普通单位表查
                //@不能使用jcUntMgeRepository.getById(id); 会直接抛出异常jakarta.persistence.EntityNotFoundException并不是返回null的实体！！
                JcUntMge jcUntMge=jcUntMgeRepository.findById(id).orElse(null);
                if(null != jcUntMge  && null!= jcUntMge.getUunit())
                    aunit = units.findById(jcUntMge.getUunit()).orElse(null);
            }
        }
        else {  //旧检验平台
            UntMge untMge=untMgeRepository.findById(id).orElse(null);
            if(null !=untMge && null!= untMge.getUunit())
                aunit = units.findById(untMge.getUunit()).orElse(null);
            //if(null==aunit)     log.info("没单位 id {},jy", id);
        }
        return aunit;
    }
    //获取单位映射的失败信息：
    public String getFailOldUnitId映射(Boolean usejc, Long id, Boolean ofPmt) {
        if(null==id)    return "IdNil";
        if(usejc) {  //监察平台
            if(ofPmt) {     //从PMT永久单位表找
                JcPermtUnt jcPermtUnt = jcPermtUntRepository.findById(id).orElse(null);
                if(null==jcPermtUnt)   return "非法单位id";
                if(null==jcPermtUnt.getUunit())       return jcPermtUnt.getFail();
            }else{   //福建监察平台对应id：从普通单位表查
                JcUntMge jcUntMge=jcUntMgeRepository.findById(id).orElse(null);
                if(null==jcUntMge)   return "非法单位id";
                if(null==jcUntMge.getUunit())       return jcUntMge.getFail();
            }
        }
        else {  //旧检验平台
            UntMge untMge=untMgeRepository.findById(id).orElse(null);
            if(null==untMge)   return "非法单位id";
            if(null==untMge.getUunit())       return untMge.getFail();
        }
        return "";
    }
    //旧平台的 设备使用场所--字典库 翻译成本平台的
    public String mapFromEqpUsePlace(Integer seq) {
        if(null==seq)   return null;
        int idx=seq.intValue()-1;
        if(idx>=0 && idx<plclsArr.length)   return plclsArr[idx];
        else   return null;
    }
    public static <T> int  arrayIndexOf(T[] arr, T val) {
        return Arrays.asList(arr).indexOf(val);
    }
    //设备使用场所 转为旧平台的id: 去除:{id:'11',text:'住宅'} 没意义的数据
    public Long mapToEqpUsePlace(String tag) {
        long iseq= arrayIndexOf(plclsArr, tag);
        if(iseq<0)  return null;
        else  return (iseq+1);
    }
    public String mapHashKey(PipingUnit who) {
        final char splchr = '☏';    //分割不混淆。
        StringBuilder  builder = new StringBuilder(128);
        builder.append(who.getRno()).append(splchr).append(who.getCode()).append(splchr).append(who.getStart())
                .append(splchr).append(who.getStop()).append(splchr).append(who.getName());
        return builder.toString();
    }
    //only调试的
    //泛型运行才报错的class java.util.UUID cannot be cast to class org.fjsei.yewu.index.EqpEs
    private  <T> void  test_DUMP1(List<T> pall) {
        pall.<EqpEs>forEach( one->{
            log.info("DUMP{},定期={};{}", ((EqpEs)one).getUseu().getId(), ((EqpEs)one).getNxtd2(),((EqpEs)one).getCod());
        });
    }
    //only调试的;  投影类不能直接用报错class com.sun.proxy.$Proxy411 cannot be cast to class md.specialEqp.inspect.Detail
    private  <T> void  test_DUMP2(List<T> pall) {
        pall.<DetailPi>forEach( one->{
            log.info("DUMPbus {},Isp;", ((DetailPi)one).getId() );
        });
    }
    /**分片作业+打包分组一次执行的--专门ES索引库的翻页ElasticsearchRepository：
     * EqpES和Eqp完全id等量一致的。避免ES更新直接把本轮的批作业结果登记进入关系数据库eqp? 干脆不要直接上后端输出日志标记清楚错误，正常就省略。
     * 【流式可打包】多个记录用串联过滤器合并为一个队列给后面函数。参数: packStream函数。
     * 目前用在EqpEs索引搜索，定检年检任务生成的下次检验日期有效的范围之内过滤设备查询+后续。
     【依然有问题】 /org/springframework/data/elasticsearch/client/elc/RequestConverter.java:1378
     query.getSearchAfter().stream().map(it -> FieldValue.of(it.toString())).collect(Collectors.toList()));
     SearchRequest-》 指定为date的字段会有一个附加属性format，如 “format”: “yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis” 不论 date 是什么展示格式，在 Elasticsearch 内部存储时都是转换成 UTC，并且把时区也会计算进去，从而得到 milliseconds-since-the-epoch 并作为存储的格式。
     Long类型的。 epoch_millis; date总是被显示为字符串格式, 即使他们在JSON文档中是long类型. 没有指定format则默认格式为: strict_date_optional_time || epoch_millis；
     当前转换日期模式，遇到null的日期字段，可能会导致分页异常，无法往前走了。
     * */
    public <T,P extends Node<UUID>> boolean  searchAfterEsP(JsliceMang jobm, NativeQuery query, Function<T, Triple<?,?,?>> sliceJob,CrudRepository repository1,
                                                            CrudRepository repository2, Class<P> type, BiFunction<P, P, Boolean> packStream,BiFunction<Object, Boolean, Object>  safterCvtFn){
        if(StringUtils.hasText(jobm.getSortat())){
            List<Object> afterObj= (List<Object>) safterCvtFn.apply(jobm.getSortat(), true);
//            List<Object> afterObj= JSON.parseObject(jobm.getSortat(), List.class);
            //spring-data-elasticsearch-5 只能支持FieldValue(Kind.String, value); 日期必须转换
//            queryBuilder.withSearchAfter(afterObj);
            query.setSearchAfter(afterObj);
        }
//        NativeSearchQuery searchQuery =queryBuilder.build();
//        Query query = queryBuilder.build();
//        IndexCoordinates indexCoordinates = esTemplate.getIndexCoordinatesFor(type);
        log.info("run:作业{},Off={}", jobm.getName(), jobm.getOffs());
        //可能遭遇：5,000 milliseconds timeout on connection http-outgoing-11报错的， 第一次比较慢,再次运行
//        SearchHits<P> searchHits = esTemplate.search(searchQuery, type, indexCoordinates);
        String queryDslStr= Objects.toString(query.getFilter(),"空");		//改成过滤器模式的 query3.getQuery().toString();
        SearchHits<P> searchHits = esTemplate.search(query, type);
        List<SearchHit<P>>  hits = searchHits.getSearchHits();
        //【必需】前提条件：pall必须是相似性排列的顺序
        List<P>  pall = (List<P>) SearchHitSupport.unwrapSearchHits(hits);    //保证pall需要和hits一一对应的排序关系&&个数一致
        //test_DUMP(pall);
        List rpl1=new ArrayList<>();
        List rpl2=new ArrayList<>();
        List<P>  packs=null;
        int  count=0;       //实际完成的实体扫描个数
        for ( ; count < pall.size(); ) {
            P parent = pall.get(count);
            P nextEnt = (count < pall.size()-1)? pall.get(count+1) : null;       //下一个的预先获取
            //顺序流可打包嵌入点： 队列中顺序搜寻允许打包的对象，一直到不相容的或者时仅剩最后一个实体了。 起初牟定对象。。。紧跟着相容对象；【必需】前提条件：pall必须是相似性排列的顺序。
            if(null!=nextEnt) {
                if(null==packs)   packs=new ArrayList<>();
                packs.add(parent);
                if(packStream.apply(parent, nextEnt)) {
                    count++;        //可连续打包的部分
                    continue;
                }
            }
            else{      //parent位于最后一个实体的：
                if(null==packs)   packs=new ArrayList<>();
                packs.add(parent);
            }
            //log.info("run:作业count{},packs包={}", count, packs!=null? packs.size():0);
            //现在处理当前的相容块：
            Triple<String, Object, Object> triple=null;
            Boolean execOK=true;
            try {
                //返回需要批量存储仓库的实体1，2： 泛型：编译可能通过，运行报错class org.fjsei.yewu.index.EqpEs cannot be cast to class org.fjsei.yewu.pojo.SliceSyncRes
                triple= (Triple<String, Object, Object>) sliceJob.apply((T) packs);
                Assert.isTrue(null!=triple,"必须triple");
                String result= triple.getVal1();
                //错误提示result，不代表绝对没有生成可用数据，有些仅仅算提示。
                if(StringUtils.hasText(result) )
                    log.info("作业{},{}问题={}",jobm.getName(), parent.getId(), result);

                Object enty1= triple.getVal2();
                if(null!=enty1) {       //允许返回List[]
                    if(enty1 instanceof List)   rpl1.addAll((Collection) enty1);
                    else    rpl1.add(enty1);
                }
                Object enty2= triple.getVal3();
                if(null!=enty2) {
                    if(enty2 instanceof List)   rpl2.addAll((Collection) enty2);
                    else    rpl2.add(enty2);
                }
            } catch (Exception e) {
                e.printStackTrace();
                execOK= false;
            }
            if(!execOK){
                log.error("报错暂停,该后退本次after:{}",jobm.getOffs()+count);       //ES深度翻页模式需依据sortAt而不是offs的。没必要更改sortAt#
                jobm.setDesc("报错暂停,该后退本次after");
                //取消批量存储？ 一整批抛弃
                return false;
            }
            count++;
            packs=null;     //一个打包块处理结束了。
            LocalDateTime now=java.time.LocalDateTime.now();
            java.time.Duration duration = java.time.Duration.between(jobm.getLast(), now);
            //怕时间太长，影响任务抢占了。但是分片运行有效时间太短了反而不容易有机会抢到。 才30秒。可调试的容易超时！！
            if(duration.toSeconds() > 30)
                break;
        }
        //利用到上一次的｛初始化时第一次的｝查询最后一个数据
        SearchHit lastsortHit = count>0? hits.get(count-1) : null;
        if(null!=lastsortHit) {
            //ES8报错了failed to parse date field [1694390400000] with format [uuuu-MM-dd]，日期
            String lastSort= (String) safterCvtFn.apply(lastsortHit.getSortValues(), false);
//            String lastSort = JSON.toJSONString(lastsortHit.getSortValues());
            assertTrue(!lastSort.equals(jobm.getSortat()), () -> "分页异常，无法往前走");
            jobm.setSortat(lastSort);           //成功才会修改的: 另外数据库还有事务机制。
        }
        //最后一批一起保存。应该把ES批量保存和数据库事务拆开，CRDB事务只管数据库的作业，ES保存时间要拖出去。
        if(null!=repository1)   repository1.saveAll(rpl1);   //居然5秒就保存超时失败导致分片任务终止
        if(null!=repository2)   repository2.saveAll(rpl2);
        //下一步报错invalid_index_name_exception, reason=Invalid index name [ee3-read,ev2-read,ec4-read,ep8-read,ea6-read,eb1-read,ef5-read,er9-read,eqp-read]
        //<EqpES>pagingAndSortingRepository.saveAll(pall);
        jobm.setOffs(jobm.getOffs() + count);
        return true;
    }
    /**分片批处理, 参数queryBuilder可设置过滤。过滤查询，却不能直接更新啊，有何意义？还得再去按ID查询一次。
     * <T>是基类 ； <P>投影类;  参数 prjType: 不能直接用基类: 报错Class-based DTOs are not yet supported;
     * */
    @Deprecated
    public <T,P extends Node<UUID>> boolean  searchAfter还得提取实体(JsliceMang jobm, NormalExecutor<T,UUID> applyRepository,
                                         Function<T, Triple<?,?,?>> sliceJob, CrudRepository repository1, CrudRepository repository2,
                                         Class<P> prjType, ComparablePath<UUID> entityIdQm, BooleanBuilder queryBuilder){
        Pageable pageable= PageOffsetFirst.of(0,jobm.getLimt(), Sort.by("id").ascending());
        UUID  afterObj=null;
        if(StringUtils.hasText(jobm.getSortat())){
            afterObj= JSON.parseObject(jobm.getSortat(), UUID.class);
        }
        BooleanBuilder builder = new BooleanBuilder();
        if(null!=afterObj) {
            BooleanBuilder idExpr= new BooleanBuilder();
            idExpr.and(entityIdQm.gt(afterObj));
            builder.and(idExpr).and(queryBuilder);
        }
        else
            builder.and(queryBuilder);
        //投影哪些字段?
        QBeanMy<?> piExp=new QBeanMy(Detail.class, entityIdQm );
        Slice<P> slice= applyRepository.findBy(piExp, builder, (query)-> {
            FluentQuery.FetchableFluentQuery<P> queryUse = query.as(prjType);
            return  queryUse.page(pageable);
        });
        List<P> pall= (List<P>) slice.getContent();
        List<UUID> ids= pall.stream().map(isppi->isppi.getId()).collect(Collectors.toList());
//        Slice<P> rpage= (Slice<P>)applyRepository.findAll(builder,pageable);
//        List<P>  pall = (List<P>) rpage.toList();
        //test_DUMP2(pall);
        List rpl1=new ArrayList<>();
        List rpl2=new ArrayList<>();
        int  count=0;       //实际完成的实体扫描个数
        for ( ; count < pall.size(); ) {
            P parentPrj = pall.get(count);
            UUID  id= parentPrj.getId();
            T parent= applyRepository.getReferenceById(id);
            //投影获得对象无法强制转JPA实体,错com.sun.proxy.$Proxy410 cannot be cast to class md.specialEqp.inspect.Detail
            //Detail realEnt= (Detail) parentPrj;
            Triple<String, Object, Object> triple=null;
            Boolean execOK=true;
            try {
                //返回需要批量存储仓库的实体1，2： 泛型：编译可能通过，运行报错class org.fjsei.yewu.index.EqpEs cannot be cast to class org.fjsei.yewu.pojo.SliceSyncRes
                triple= (Triple<String, Object, Object>) sliceJob.apply( parent);
                Assert.isTrue(null!=triple,"必须triple");
                String result= triple.getVal1();
                //投影类报错class com.sun.proxy.$Proxy411 cannot be cast to class org.fjsei.yewu.pojo.SliceSyncRes
                if(StringUtils.hasText(result) )
                    ((SliceSyncRes)parent).setFail(result);
                else
                    ((SliceSyncRes)parent).setFail("OK");
                Object enty1= triple.getVal2();
                if(null!=enty1) {       //允许返回List[]
                    if(enty1 instanceof List)   rpl1.addAll((Collection) enty1);
                    else    rpl1.add(enty1);
                }
                Object enty2= triple.getVal3();
                if(null!=enty2) {
                    if(enty2 instanceof List)   rpl2.addAll((Collection) enty2);
                    else    rpl2.add(enty2);
                }
            } catch (Exception e) {
                e.printStackTrace();
                execOK= false;
            }
            if(!execOK){
                log.error("报错暂停,该后退本次after:{}", jobm.getSortat());       //深度翻页模式需依据sortAt而不是offs的。没必要更改sortAt
                jobm.setDesc("报错暂停,该后退本次after");
                //取消批量存储？ 一整批抛弃
                return false;
            }
            count++;
            LocalDateTime now=java.time.LocalDateTime.now();
            java.time.Duration duration = java.time.Duration.between(jobm.getLast(), now);
            //怕时间太长，影响任务抢占了。但是分片运行有效时间太短了反而不容易有机会抢到。 才30秒。可调试的容易超时！！
            if(duration.toSeconds() > 30)
                break;
        }
        //利用到上一次的｛初始化时第一次的｝查询最后一个数据
        P  lastsortHit= count>0? pall.get(count-1) : null;
        if(null!=lastsortHit) {
            String lastSort = JSON.toJSONString(lastsortHit.getId());
            jobm.setSortat(lastSort);           //成功才会修改的: 另外数据库还有事务机制。
        }
        //最后一批一起保存。应该把ES批量保存和数据库事务拆开，CRDB事务只管数据库的作业，ES保存时间要拖出去。
        if(null!=repository1)   repository1.saveAll(rpl1);   //居然5秒就保存超时失败导致分片任务终止
        if(null!=repository2)   repository2.saveAll(rpl2);
        //纯粹给前端提示用的。Offs不跳动++，前端可能认为没法前进了。
        jobm.setOffs(jobm.getOffs() + count);
        return true;
    }
    /**【观察】争用激烈的表场合，用id漫步比不用id的也就是用limit offset的分页查询会快很多的。分片批处理, 参数queryBuilder可设置过滤。
     * 依据Id升序批处理：offset舍弃，改成sortAt临时存储到分片作业控制器，后续分片直接提取上一次ID最大值。
     * 假如不用QuerydslNcExecutor的话,真是会出现count()语句执行==严重影响性能。
     * <T>是基类 ； <P>投影类;
     * 参数 prjType: 不能直接用基类: 报错Class-based DTOs are not yet supported;
     * */
    public <T extends Node<UUID>,P extends Node<UUID>> boolean  searchAfter(JsliceMang jobm, NormalExecutor<T,UUID> applyRepository,
                                             Function<T, Triple<?,?,?>> sliceJob, CrudRepository repository1, CrudRepository repository2,
                                             Class<P> prjType){
        Pageable pageable= PageOffsetFirst.of(0,jobm.getLimt());
        UUID  afterObj=null;
        if(StringUtils.hasText(jobm.getSortat())){
            afterObj= JSON.parseObject(jobm.getSortat(), UUID.class);
        }
        Slice<P>  slice=null;
        if(null!=afterObj)
            slice= applyRepository.findAllByIdGreaterThanOrderByIdAsc(afterObj, pageable, prjType);
        else
            slice= applyRepository.findAllByOrderById(pageable, prjType);
        //Slice<P> slice= (Slice<P>) applyRepository.findAll(builder, pageable); 这必然出现count()语句
        //没法直接加Predicate predicate或者Specification<Detail> spec,  ;unitRepository.readAllBy(predicate, pageable, Unit.class);
        List<T> pall= (List<T>) slice.getContent();
        //test_DUMP2(pall);
        List rpl1=new ArrayList<>();
        List rpl2=new ArrayList<>();
        int  count=0;       //实际完成的实体扫描个数
        for ( ; count < pall.size(); ) {
            T parent = pall.get(count);
            //UUID  id= parentPrj.getId();
            //T parent= applyRepository.getReferenceById(id);  对于上面已经读取T=非投影的已经装入的不会读取数据库,否则会。
            Triple<String, Object, Object> triple=null;
            Boolean execOK=true;
            try {
                //返回需要批量存储仓库的实体1，2： 泛型：编译可能通过，运行报错class org.fjsei.yewu.index.EqpEs cannot be cast to class org.fjsei.yewu.pojo.SliceSyncRes
                triple= (Triple<String, Object, Object>) sliceJob.apply((T) parent);
                Assert.isTrue(null!=triple,"必须triple");
                resolveResFailSave(prjType, parent, triple, null, (res, tripleP, oparam) -> {
                    String result= tripleP.getVal1();
                    if(StringUtils.hasText(result) )
                        ((SliceSyncRes)parent).setFail(result);
                    else
                        ((SliceSyncRes)parent).setFail("OK");
                });
                //if (SliceSyncRes.class.isAssignableFrom(prjType)) {   }
                Object enty1= triple.getVal2();
                if(null!=enty1) {       //允许返回List[]
                    if(enty1 instanceof List)   rpl1.addAll((Collection) enty1);
                    else    rpl1.add(enty1);
                }
                Object enty2= triple.getVal3();
                if(null!=enty2) {
                    if(enty2 instanceof List)   rpl2.addAll((Collection) enty2);
                    else    rpl2.add(enty2);
                }
            } catch (Exception e) {
                e.printStackTrace();
                execOK= false;
            }
            if(!execOK){
                log.error("报错暂停,该后退本次after:{}", jobm.getSortat());       //深度翻页模式需依据sortAt而不是offs的。没必要更改sortAt
                jobm.setDesc("报错暂停,该后退本次after");
                //取消批量存储？ 一整批抛弃
                return false;
            }
            count++;
            LocalDateTime now=java.time.LocalDateTime.now();
            java.time.Duration duration = java.time.Duration.between(jobm.getLast(), now);
            //怕时间太长，影响任务抢占了。但是分片运行有效时间太短了反而不容易有机会抢到。 才30秒。可调试的容易超时！！
            if(duration.toSeconds() > 30)
                break;
        }
        //利用到上一次的｛初始化时第一次的｝查询最后一个数据
        T  lastsortHit= count>0? pall.get(count-1) : null;
        if(null!=lastsortHit) {
            String lastSort = JSON.toJSONString(lastsortHit.getId());
            jobm.setSortat(lastSort);           //成功才会修改的: 另外数据库还有事务机制。
        }
        //最后一批一起保存。应该把ES批量保存和数据库事务拆开，CRDB事务只管数据库的作业，ES保存时间要拖出去。
        if(null!=repository1)   repository1.saveAll(rpl1);   //居然5秒就保存超时失败导致分片任务终止
        if(null!=repository2)   repository2.saveAll(rpl2);
        //纯粹给前端提示用的。Offs不跳动++，前端可能认为没法前进了。
        jobm.setOffs(jobm.getOffs() + count);
        return true;
    }
    //ID类型=Long版本的
    public <T extends Node<Long>,P extends Node<Long>> boolean  searchAfterL(JsliceMang jobm, NormalExecutor<T,Long> applyRepository,
                                                                            Function<T, Triple<?,?,?>> sliceJob, CrudRepository repository1, CrudRepository repository2,
                                                                            Class<P> prjType){
        Pageable pageable= PageOffsetFirst.of(0,jobm.getLimt());
        Long  afterObj=null;
        if(StringUtils.hasText(jobm.getSortat())){
            afterObj= JSON.parseObject(jobm.getSortat(), Long.class);
        }
        Slice<P>  slice=null;
        if(null!=afterObj)
            slice= applyRepository.findAllByIdGreaterThanOrderByIdAsc(afterObj, pageable, prjType);
        else
            slice= applyRepository.findAllByOrderById(pageable, prjType);
        //Slice<P> slice= (Slice<P>) applyRepository.findAll(builder, pageable); 这必然出现count()语句
        //没法直接加Predicate predicate或者Specification<Detail> spec,  ;unitRepository.readAllBy(predicate, pageable, Unit.class);
        List<T> pall= (List<T>) slice.getContent();
        //test_DUMP2(pall);
        List rpl1=new ArrayList<>();
        List rpl2=new ArrayList<>();
        int  count=0;       //实际完成的实体扫描个数
        for ( ; count < pall.size(); ) {
            T parent = pall.get(count);
            //UUID  id= parentPrj.getId();
            //T parent= applyRepository.getReferenceById(id);  对于上面已经读取T=非投影的已经装入的不会读取数据库,否则会。
            Triple<String, Object, Object> triple=null;
            Boolean execOK=true;
            try {
                //返回需要批量存储仓库的实体1，2： 泛型：编译可能通过，运行报错class org.fjsei.yewu.index.EqpEs cannot be cast to class org.fjsei.yewu.pojo.SliceSyncRes
                triple= (Triple<String, Object, Object>) sliceJob.apply((T) parent);
                Assert.isTrue(null!=triple,"必须triple");
                resolveResFailSave(prjType, parent, triple, null, (res, tripleP, oparam) -> {
                    String result= tripleP.getVal1();
                    if(StringUtils.hasText(result) )
                        ((SliceSyncRes)parent).setFail(result);
                    else
                        ((SliceSyncRes)parent).setFail("OK");
                });
                //if (SliceSyncRes.class.isAssignableFrom(prjType)) {   }
                Object enty1= triple.getVal2();
                if(null!=enty1) {       //允许返回List[]
                    if(enty1 instanceof List)   rpl1.addAll((Collection) enty1);
                    else    rpl1.add(enty1);
                }
                Object enty2= triple.getVal3();
                if(null!=enty2) {
                    if(enty2 instanceof List)   rpl2.addAll((Collection) enty2);
                    else    rpl2.add(enty2);
                }
            } catch (Exception e) {
                e.printStackTrace();
                execOK= false;
            }
            if(!execOK){
                log.error("报错暂停,该后退本次after:{}", jobm.getSortat());       //深度翻页模式需依据sortAt而不是offs的。没必要更改sortAt
                jobm.setDesc("报错暂停,该后退本次after");
                //取消批量存储？ 一整批抛弃
                return false;
            }
            count++;
            LocalDateTime now=java.time.LocalDateTime.now();
            java.time.Duration duration = java.time.Duration.between(jobm.getLast(), now);
            //怕时间太长，影响任务抢占了。但是分片运行有效时间太短了反而不容易有机会抢到。 才30秒。可调试的容易超时！！
            if(duration.toSeconds() > 30)
                break;
        }
        //利用到上一次的｛初始化时第一次的｝查询最后一个数据
        T  lastsortHit= count>0? pall.get(count-1) : null;
        if(null!=lastsortHit) {
            String lastSort = JSON.toJSONString(lastsortHit.getId());
            jobm.setSortat(lastSort);           //成功才会修改的: 另外数据库还有事务机制。
        }
        //最后一批一起保存。应该把ES批量保存和数据库事务拆开，CRDB事务只管数据库的作业，ES保存时间要拖出去。
        if(null!=repository1)   repository1.saveAll(rpl1);   //居然5秒就保存超时失败导致分片任务终止
        if(null!=repository2)   repository2.saveAll(rpl2);
        //纯粹给前端提示用的。Offs不跳动++，前端可能认为没法前进了。
        jobm.setOffs(jobm.getOffs() + count);
        return true;
    }

    @FunctionalInterface
    public interface MultiConsumer<T, U, V> {
        void accept(T t, U u, V v);
    }
    //方法引用+函数式接口（运行时检查）：
   static public <T, U, V> void resolveResFailSave(Class<T> clazz, Object obj, U secondParam, V thirdParam, MultiConsumer<T, U, V> action) {
        if (SliceSyncRes.class.isAssignableFrom(clazz)) {
            @SuppressWarnings("unchecked")
            T castObj = (T) obj;
            action.accept(castObj, secondParam, thirdParam);    //存储库实体类型有做extends SliceSyncRes的才需要保存。
        } else {
            //不保存
        }
    }
    /**分片批处理, 参数queryBuilder可设置过滤。
     * 没有排序的存储库漫步和处理：offset + limt； 数据库处理在任务分片间并没有事务保障，会漏掉新插入的数据，重复性数据行出现，无顺序。
     * 假如不用QuerydslNcExecutor的话,真是会出现count()语句执行==严重影响性能。
     * <T>是基类 ； <P>投影类;
     * 参数 prjType: 不能直接用基类: 报错Class-based DTOs are not yet supported;
     * @param applyRepository 轮询的存储库；
     * @param sliceJob 具体作业
     * @param  repository1/2: 附加的实体等保存存储，可批量saveAll。
     * */
    public <T extends Node<UUID>,P extends Node<UUID>> boolean  strollPage(JsliceMang jobm, NormalExecutor<T,UUID> applyRepository,
                                                                            Function<T, Triple<?,?,?>> sliceJob, CrudRepository repository1, CrudRepository repository2,
                                                                            Class<P> prjType){
        Pageable pageable= PageOffsetFirst.of(jobm.getOffs(),jobm.getLimt());
        Slice<P>  slice=applyRepository.readAllBy(pageable, prjType);
        //Slice<P> slice= (Slice<P>) applyRepository.findAll(builder, pageable); 这必然出现count()语句
        //没法直接加Predicate predicate或者Specification<Detail> spec,  ;unitRepository.readAllBy(predicate, pageable, Unit.class);
        List<T> pall= (List<T>) slice.getContent();
        //test_DUMP2(pall);
        List rpl1=new ArrayList<>();
        List rpl2=new ArrayList<>();
        int  count=0;       //实际完成的实体扫描个数
        for ( ; count < pall.size(); ) {
            T parent = pall.get(count);
            //UUID  id= parentPrj.getId();
            //T parent= applyRepository.getReferenceById(id);  对于上面已经读取T=非投影的已经装入的不会读取数据库,否则会。
            Triple<String, Object, Object> triple=null;
            Boolean execOK=true;
            try {
                //返回需要批量存储仓库的实体1，2： 泛型：编译可能通过，运行报错class org.fjsei.yewu.index.EqpEs cannot be cast to class org.fjsei.yewu.pojo.SliceSyncRes
                triple= (Triple<String, Object, Object>) sliceJob.apply((T) parent);
                Assert.isTrue(null!=triple,"必须triple");
                resolveResFailSave(prjType, parent, triple, null, (res, tripleP, oparam) -> {
                    String result= tripleP.getVal1();
                    if(StringUtils.hasText(result) )
                        ((SliceSyncRes)parent).setFail(result);
                    else
                        ((SliceSyncRes)parent).setFail("OK");
                });
                //if (SliceSyncRes.class.isAssignableFrom(prjType)) {   }
                Object enty1= triple.getVal2();
                if(null!=enty1) {       //也能附带的批量处理的存储库变更保存； 允许返回List[]
                    if(enty1 instanceof List)   rpl1.addAll((Collection) enty1);
                    else    rpl1.add(enty1);
                }
                Object enty2= triple.getVal3();
                if(null!=enty2) {
                    if(enty2 instanceof List)   rpl2.addAll((Collection) enty2);
                    else    rpl2.add(enty2);
                }
            } catch (Exception e) {
                e.printStackTrace();
                execOK= false;
            }
            if(!execOK){
                log.error("报错暂停,该后退本次after:{}", jobm.getSortat());       //深度翻页模式需依据sortAt而不是offs的。没必要更改sortAt
                jobm.setDesc("报错暂停,该后退本次after");
                //取消批量存储？ 一整批抛弃
                return false;
            }
            count++;
            LocalDateTime now=java.time.LocalDateTime.now();
            java.time.Duration duration = java.time.Duration.between(jobm.getLast(), now);
            //【自带事务超时，30秒必须让出时间片】怕时间太长，影响任务抢占了。但是分片运行有效时间太短了反而不容易有机会抢到。 才30秒。可调试的容易超时！！
            if(duration.toSeconds() > 30)
                break;
        }
        //最后一批一起保存。应该把ES批量保存和数据库事务拆开，CRDB事务只管数据库的作业，ES保存时间要拖出去。
        if(null!=repository1)   repository1.saveAll(rpl1);   //居然5秒就保存超时失败导致分片任务终止
        if(null!=repository2)   repository2.saveAll(rpl2);
        //纯粹给前端提示用的。Offs不跳动++，前端可能认为没法前进了。
        jobm.setOffs(jobm.getOffs() + count);
        if(pall.size()<=0){
            jobm.setDesc("就没得前进");
            return false;
        }
        return true;
    }
}
